Spark Streaming examplesΒΆ
Use socket server to publish data from file to port 9000
# -*- coding: utf-8 -*-
import os
import sys
os.chdir("/home/cloudops/spark")
os.curdir
# Configure the environment. Set this up to the directory where
# Spark is installed
if 'SPARK_HOME' not in os.environ:
os.environ['SPARK_HOME'] = '/opt/spark'
# Create a variable for our root path
SPARK_HOME = os.environ['SPARK_HOME']
# Add the following paths to the system path.
# Please check your installation to make sure that these zip files
# actually exist. The names might change as versions change.
sys.path.insert(0,os.path.join(SPARK_HOME,"python"))
sys.path.insert(0,os.path.join(SPARK_HOME,"python","lib"))
sys.path.insert(0,os.path.join(SPARK_HOME,"python","lib","pyspark.zip"))
sys.path.insert(0,os.path.join(SPARK_HOME,"python","lib","py4j-0.9-src.zip"))
#Initiate Spark context. Once this is done all other applications can run
from pyspark import SparkContext
from pyspark import SparkConf
# Optionally configure Spark Settings
conf = SparkConf()
conf.set("spark.executor.memory", "1g")
conf.set("spark.cores.max", "2")
conf.setAppName("Spark-Streaming")
# <pyspark.conf.SparkConf at 0x7fddbb2eb588>
# ValueError: Cannot run multiple SparkContexts at once;
# existing SparkContext(app=Spark-Test, master=local) created by __init__
# conf.setAppName("Spark-Test")
# =====================================
# Initialize SparkContext. Run only once !!!
# Otherwise you get multiple Context Error.
# for streaming, create a spark context with 2 threads
# * one thread is background thread
# * second (frontend) is for streaming
# Run sc.stop() before you create your new SparkContext.
# Also, you can use sc = SparkContext.getOrCreate()
# instead of sc = SparkContext()
# sc.stop()
sc = SparkContext('local[2]', conf=conf)
# OK
# =====================================
from pyspark.streaming import StreamingContext
# =====================================
# 1. Streaming with simple data
# =====================================
vc = [[-0.1, -0.2],
[0.1, 0.3],
[1.1, 1.5],
[0.9, 0.9]]
# RDD (parallelize) - publish on Queue
dvc = [sc.parallelize(i, 1) for i in vc]
# Stream the contents of the RDD (Queue Stream) - deque the stream
# micro-batch interval = 2 sec
ssc = StreamingContext(sc, 2)
# Create input stream (new RDD) from source RDD to ssc
input_stream = ssc.queueStream(dvc)
def get_output(rdd):
print(rdd.collect())
# transformation (function) on input stream
input_stream.foreachRDD(get_output)
# after stream definition - start it (see console output and jobs in UI)
ssc.start()
# [-0.1, -0.2]
# [0.1, 0.3]
# [1.1, 1.5]
# [0.9, 0.9]
# []
# []
# []
# . . .
# stop the streaming
ssc.stop()
# =====================================
# 2. Streaming with TCP/IP data
# =====================================
# Need to create a socket server on Java or Python
# Server is publishing data from file
# RESTART KERNEL!!!
# Create streaming context with latency of 1
streamContext = StreamingContext(sc, 3)
totalLines = 0
# Listen port 9000 and assign to lines RDD; for every micro-batch
lines = streamContext.socketTextStream("localhost", 9000)
# =====================================
# 1. Word count for every lines RDD
words = lines.flatMap(lambda line: line.split(" "))
# how many times the word has been repeated?
pairs = words.map(lambda word: (word, 1))
# for every key (word in pairs) keep adding a count
wordCounts = pairs.reduceByKey(lambda x, y: x + y)
# pprint is special command for streaming; get top 5 items
wordCounts.pprint(5)
# =====================================
# 2. Another job - Count lines
totalLines = 0
linesCount = 0
def computeMetrics(rdd):
global totalLines
global linesCount
linesCount = rdd.count()
totalLines += linesCount
print(rdd.collect())
print("Lines in RDD :", linesCount, " Total Lines:", totalLines)
lines.foreachRDD(computeMetrics)
# =====================================
# 3. Another job - Compute window metrics
def windowMetrics(rdd):
print("Window RDD size:", rdd.count())
# 6 - interval (= multiple of micro-batch time (= 3 sec, above)
# So, it's 2 micro-batches in window (current and previous)
# 3 - slide by 3 seconds
windowedRDD = lines.window(6, 3)
windowedRDD.foreachRDD(windowMetrics)
# =====================================
streamContext.start()
# -------------------------------------------
# Time: 2019-03-19 17:11:18
# -------------------------------------------
# ('da', 1)
# ('sucked...', 1)
# ('Client', 1)
# ('on', 1)
# ('127.0.0.1ERM', 1)
# ...
# ['Client on 127.0.0.1ERM da vinci code and it sucked...']
# Lines in RDD : 1 Total Lines: 1
# Window RDD size: 1
# . . .
# -------------------------------------------
# Time: 2019-03-19 17:11:57
# -------------------------------------------
# ('really', 1)
# ('like', 1)
# ('The', 1)
# ('Da', 1)
# ('Vinci', 1)
# ...
# ['I really like The Da Vinci Code.']
# Lines in RDD : 1 Total Lines: 20
# Window RDD size: 3
streamContext.stop()
print("Overall lines :", totalLines)
# Overall lines : 20